package org.zjvis.dp.data.lineage.util;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.BeanUtils;
import org.zjvis.dp.data.lineage.data.Place;
import org.zjvis.dp.data.lineage.data.Position;
import org.zjvis.dp.data.lineage.exception.DataLineageException;

/**
 * @author zhouyu
 * @create 2023-04-25 11:06
 */
public class DataLineageUtil {
    /**
     * 对边里面的任务id进行去重
     * @param keyExtractor
     * @return
     * @param <T>
     */
    public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
        Set<Object> seen = ConcurrentHashMap.newKeySet();
        return t -> seen.add(keyExtractor.apply(t));
    }

    /**
     * 判断是不是 'ab' "a, b"
     * @param name
     * @return
     */
    public static boolean isString(String name) {
        if(StringUtils.isEmpty(name)) {
            return false;
        }

        char startChar = name.charAt(0);
        char endChar = name.charAt(name.length() - 1);

        if(startChar == '"' && endChar == '"') {
            return true;
        }

        if(startChar == '\'' && endChar == '\'') {
            return true;
        }

        return false;
    }


    public static <T> List<T> castList(Object obj, Class<T> clazz) {
        List<T> result = Lists.newArrayList();
        if(obj instanceof List<?>)
        {
            for (Object o : (List<?>) obj)
            {
                result.add(clazz.cast(o));
            }
            return result;
        } else {
            throw  new DataLineageException("cast type failed, pls check");
        }
    }

    public static <T> List<T> deepCopyList(List<T> sourceList, Class<T> tClass, String[] ignoreProperties) {
        List<T> result = Lists.newArrayList();
        for(T element : sourceList) {
            try {
                T target = tClass.newInstance();
                BeanUtils.copyProperties(element, target, ignoreProperties);
                result.add(target);
            } catch (IllegalAccessException | InstantiationException e) {
                throw new DataLineageException("deep copy list failed");
            }
        }
        return result;
    }

    public static boolean onlySemicolonsOrNewLine(String str) {
        for(int i = 0; i < str.length(); i++) {
            if(str.charAt(i) != '\n' && str.charAt(i) != ';') {
                return false;
            }
        }
        return true;
    }

    public static boolean isCommentLine(String str) {
        str = str.trim();
        return str.startsWith("--");
    }

    public static boolean isSpecificLine(String str) {
        return onlySemicolonsOrNewLine(str) || isCommentLine(str);
    }

    public static final String SELECT_VALUE = "SELECT";
    public static final String FROM_VALUE = "FROM";

    public static List<List<Place>> extractFieldPlace(String sql, int startLineNumber) {
        String[] sqlArr = sql.split("\\n");

        List<List<Place>> result = Lists.newArrayList();
        List<Place> placeList = Lists.newArrayList();
        boolean isField = false;

        //找到SELECT、 FROM位置
        Map<String, Boolean> fieldFlagMap = Maps.newHashMap();
        for(int i = 0; i < sqlArr.length; i ++) {
            if (isSpecificLine(sqlArr[i])) {
                continue;
            }
            StringBuilder word = new StringBuilder();
            for(int j  = 0; j < sqlArr[i].length(); j++) {
                char ch = sqlArr[i].charAt(j);
                if (isSpecificChar(ch)) {
                    if (word.length() > 0) {
                        isSelectOrFromFlag(fieldFlagMap, sqlArr, word.toString(), i, j - 1);
                        word = new StringBuilder();
                    }
                } else {
                    word.append(ch);
                }

                if (j == sqlArr[i].length() - 1 && word.length() > 0) {
                    isSelectOrFromFlag(fieldFlagMap, sqlArr, word.toString(), i, j);
                }
            }
        }

        List<Position> positionList = Lists.newArrayList();
        for (int i = 0; i < sqlArr.length; i++) {
            if (isSpecificLine(sqlArr[i])) {
                continue;
            }

            for (int j = 0; j < sqlArr[i].length(); j++) {
                char ch = sqlArr[i].charAt(j);
                if (isField) {
                    if (ch == ',') {
                        Place place = getPlace(positionList);
                        if(Objects.nonNull(place)) {
                            placeList.add(place);
                        }
                        positionList = Lists.newArrayList();
                    } else {
                        positionList.add(
                                Position.builder()
                                        .line(startLineNumber + i)
                                        .ch(j)
                                        .character(sqlArr[i].charAt(j))
                                        .build()
                        );
                    }
                }

                String key = String.format("%d_%d", i, j);
                if(fieldFlagMap.containsKey(key)) {
                    if(fieldFlagMap.get(key).equals(Boolean.TRUE)) {
                        isField = true;
                    }

                    if(fieldFlagMap.get(key).equals(Boolean.FALSE)) {
                        isField = false;

                        Place place = getPlace(positionList);
                        if(Objects.nonNull(place)) {
                            placeList.add(place);
                        }
                        positionList = Lists.newArrayList();

                        result.add(placeList);
                        placeList = Lists.newArrayList();
                    }
                }
            }
        }

        return result;
    }

    private static boolean isSpecificChar(char ch) {
        return ch == ',' || ch == ';' || ch == ' ' || ch == '\t' || ch == '(' || ch == ')';
    }

    private static Pair<Integer, Integer> getLRBlackSpaceSize(List<Position> positionList) {
        int leftSize = 0;
        int rightSize = 0;
        for (Position position : positionList) {
            char character = position.getCharacter();
            if (character == ' ' || character == '\t') {
                leftSize++;
            } else {
                break;
            }
        }

        for(int i = positionList.size() - 1; i  >= 0; i--) {
            char character = positionList.get(i).getCharacter();
            if(character == ' ' || character == '\t') {
                rightSize++;
            } else {
                break;
            }
        }
        return Pair.of(leftSize, rightSize);
    }


    private static Place getPlace(List<Position> positionList) {
        if(CollectionUtils.isEmpty(positionList)) {
            return null;
        }

        Pair<Integer, Integer> pair = getLRBlackSpaceSize(positionList);
        if (pair.getLeft() == positionList.size() || pair.getRight() == positionList.size()) {
            return null;
        }
        positionList = positionList.subList(pair.getLeft(), positionList.size() - pair.getRight());

        Position first = positionList.get(0);
        Position end = positionList.get(positionList.size() - 1);

        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < positionList.size(); i++) {
            stringBuilder.append(positionList.get(i).getCharacter());
            if (i != positionList.size() - 1 &&
                    !Objects.equals(positionList.get(i).getLine(), positionList.get(i + 1).getLine())) {
                stringBuilder.append("\n");
            }
        }

        Position left = Position.builder()
                .line(first.getLine())
                .ch(first.getCh())
                .character(first.getCharacter())
                .build();
        //为了兼容，最后位置需要后移一位
        Position right = Position.builder()
                .line(end.getLine())
                .ch(end.getCh() + 1)
                .character(end.getCharacter())
                .build();
        return Place.builder()
                .from(left)
                .to(right)
                .fieldStr(stringBuilder.toString())
                .build();
    }

    private static void isSelectOrFromFlag(
            Map<String, Boolean> fieldFlagMap,
            String[] sqlArr,
            String word,
            int i,
            int j
    ) {
        if (StringUtils.equalsIgnoreCase(word, SELECT_VALUE)) {
            fieldFlagMap.put(String.format("%d_%d", i, j), Boolean.TRUE);
        }

        if (StringUtils.equalsIgnoreCase(word, FROM_VALUE)) {
            if(j - FROM_VALUE.length() < 0) {
                fieldFlagMap.put(String.format("%d_%d", i - 1, sqlArr[i-1].length() -1), Boolean.FALSE);
            } else {
                fieldFlagMap.put(String.format("%d_%d", i, j - FROM_VALUE.length()), Boolean.FALSE);
            }
        }
    }
}
